feat: implement StreamManager for gRPC subscription management#985
Merged
feat: implement StreamManager for gRPC subscription management#985
Conversation
Amp-Thread-ID: https://ampcode.com/threads/T-019c330f-5ffe-720e-a1d5-c702b78de081 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c330f-e8f6-72c0-9e0e-93bbc1f1b9c2 Co-authored-by: Amp <amp@ampcode.com>
…scriptions Amp-Thread-ID: https://ampcode.com/threads/T-019c3702-6f33-74eb-89d9-fad10aac35ed Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c3830-1c06-7406-831c-fe2af2fd8314 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c3838-288d-732e-bcf0-bafe984b350b Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c419c-5b98-774b-b9d6-40733b964154 Co-authored-by: Amp <amp@ampcode.com>
* master: chore: add subscription activation and per-program metrics (#929)
Amp-Thread-ID: https://ampcode.com/threads/T-019c5080-01ee-71fa-aa34-97264f3a0900 Co-authored-by: Amp <amp@ampcode.com>
Contributor
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`:
- Around line 540-568: The review points out that the static
SIGNAL_CONNECTION_COUNT inside the generic async fn signal_connection_issue of
ChainLaserActor<H, S> is a single global counter shared across all generic
instantiations; if you intended per-actor throttling, remove the static and add
an AtomicU16 (or AtomicUsize) field to the actor struct (e.g.,
signal_connection_count) and update signal_connection_issue to read/update that
instance field instead of the static; keep the same logging call
(log_trace_debug) and the abort_sender.try_send logic but reference the new
per-instance field, and ensure the actor is constructed with the new field
initialized (e.g., AtomicU16::new(0)) and any other internal callers use the
instance field rather than SIGNAL_CONNECTION_COUNT.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`:
- Around line 518-525: The current logic takes self.program_sub (via
program_sub.take()) and then uses the ? operator on Self::update_subscriptions,
which on error returns early leaving self.program_sub as None and losing the
subscription state; change the flow so the original (subscribed_programs,
handle) tuple is restored into self.program_sub if update_subscriptions fails:
extract the tuple into a local variable, insert program_id and build the request
using Self::build_program_request, call Self::update_subscriptions(&handle,
"program_subscribe", request).await and, if it returns Err, reassign
self.program_sub = Some((subscribed_programs, handle)) before returning the
error (or use match/if let to only take ownership once update succeeds),
ensuring program_sub is not left None on failure.
- Around line 51-52: The blanket #[allow(unused)] on the StreamManagerConfig and
StreamManager impl blocks is too broad—narrow it by either moving test-only
helpers (current_new_sub_count, current_new_subs, unoptimized_old_stream_count,
optimized_old_stream_count, account_stream_count) into a #[cfg(test)] helper
module or placing #[allow(unused)] directly on each of those private test helper
functions; remove or tighten the module-level allow on StreamManagerConfig and
StreamManager so production-visible structs and impls still emit unused warnings
if they become dead code.
---
Duplicate comments:
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs`:
- Around line 77-83: The close_stream method mutates stream_senders with
Vec::remove which shifts subsequent indices and breaks index-based access used
by push_update_to_stream; change stream_senders from Vec<Arc<...>> to
Vec<Option<Arc<...>>> (or switch to HashMap<usize, Arc<...>>), have
close_stream(idx) set senders[idx] = None instead of removing, and update
push_update_to_stream and any iteration logic to skip None entries (or adjust to
use the chosen HashMap key lookup) so stream indices remain stable; touch the
stream_senders declaration, close_stream, and push_update_to_stream functions to
implement this consistent None-preserving behavior.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`:
- Around line 162-192: The retry backoff in update_subscriptions is currently
linear but the PR description says exponential; change the backoff calculation
in update_subscriptions (use symbols MAX_RETRIES, retries, initial_retries,
backoff_ms) to exponential backoff: compute an attempt index (e.g., attempt =
initial_retries - retries) and set backoff_ms = BASE_MS * 2u64.pow(attempt as
u32) (optionally clamp with a MAX_BACKOFF_MS constant), then sleep for that
backoff; update the inline comment to say "exponential backoff" accordingly.
- Around line 311-332: has_account_subscriptions can disagree with
has_any_subscriptions because it only checks the handle vectors
(current_new_handle, unoptimized_old_handles, optimized_old_handles) while
has_any_subscriptions checks stream_map; update has_account_subscriptions to
also consult stream_map so they cannot disagree: change the implementation of
has_account_subscriptions to return true if any of the handle fields are present
OR if stream_map contains any entry whose key.source() indicates an account
stream (use the same key.source() logic as next_update). This keeps existing
handle checks and ensures terminated gRPC streams removed from stream_map are
reflected in the account-subscription query.
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs
Outdated
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs
Outdated
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`:
- Around line 371-422: The optimize method can fail mid-loop leaving stream_map,
optimized_old_handles, unoptimized_old_handles, current_new_subs and
current_new_handle in an inconsistent state; fix by building new optimized
streams into temporary locals first (e.g., a Vec<LaserStreamWithHandle> and a
temporary map) using stream_factory.subscribe in the loop, and only swap them
into self.stream_map and self.optimized_old_handles after the entire loop
succeeds; ensure unoptimized_old_handles and current_new_* are only cleared
after swap, and on any subscribe error drop the temporaries so self retains its
original state (or perform an explicit rollback) to keep the invariants intact.
- Around line 232-257: The current code inserts pubkeys into self.subscriptions
and self.current_new_subs before calling Self::update_subscriptions or
self.insert_current_new_stream, which means a failed stream update leaves those
keys stuck; change the flow so the inserts into subscriptions and
current_new_subs happen only after the stream update succeeds (i.e., call
update_subscriptions/insert_current_new_stream first and only on Ok then insert
the pk entries), or if you prefer rollback semantics, capture the new_pks,
attempt the stream update, and on Err remove any entries you optimistically
inserted; update the logic around update_subscriptions,
insert_current_new_stream, subscriptions, current_new_subs, and
account_subscribe accordingly so keys are never added to
subscriptions/current_new_subs unless the stream update completed successfully.
---
Duplicate comments:
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`:
- Around line 541-569: The function signal_connection_issue currently uses a
static SIGNAL_CONNECTION_COUNT (AtomicU16) that is shared across all generic
instantiations; to make throttling per-actor instead, remove the static and add
a per-instance counter field (e.g., signal_connection_count: AtomicU16 or
Arc<AtomicU16>) to the ChainLaserActor struct, initialize it in the actor
constructor/new, update signal_connection_issue to access the instance field
(convert it from a static fn to an instance method or pass the counter as an
argument) and replace all uses of SIGNAL_CONNECTION_COUNT with the new field;
ensure any call sites that invoked the static function are updated to call the
instance method or pass the instance counter.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`:
- Around line 542-549: The code takes self.program_sub (program_sub.take()) then
calls Self::update_subscriptions which may fail and return early, leaving
self.program_sub None and dropping the handle/subscriptions; instead, preserve
the original (subscribed_programs, handle) in a local variable or use
mem::replace, perform the insert/build/request and await
Self::update_subscriptions, and only assign back to self.program_sub =
Some((subscribed_programs, handle)) after update_subscriptions succeeds; if
update_subscriptions returns Err, ensure you restore the original tuple into
self.program_sub before propagating the error so subscriptions/handle are not
lost.
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs
Outdated
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs
Show resolved
Hide resolved
* master: feat: Add ScheduleCommitFinalize to program-api only (#989)
Amp-Thread-ID: https://ampcode.com/threads/T-019c88cc-aa84-7031-ab78-acc63e1fcaea Co-authored-by: Amp <amp@ampcode.com>
* master: fix: wait for valid blockhash (#992)
bmuddha
reviewed
Feb 23, 2026
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_factory.rs
Outdated
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_factory.rs
Show resolved
Hide resolved
bmuddha
approved these changes
Feb 23, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements a comprehensive StreamManager to manage multiple gRPC account and program
subscription streams with automatic optimization.
The solution handles subscription lifecycle, stream promotion/optimization when limits are exceeded, and unified
polling of updates from all streams.
Added two metrics to see optimization behavior:
mbv_grpc_optimized_streams_gaugegrpc_unoptimized_streams_gaugeCLOSES: #949
Details
The PR introduces the
StreamManagerstruct which separates stream management logic from the chain laser actor. This enables better testability and cleaner separation of concerns.Key Features
Subscription Tracking: Maintains a canonical set of active account subscriptions across multiple stream generations using a shared
SharedSubscriptionsHashSet.Stream Generational Management: Organizes account streams into three generations:
max_subs_in_newthresholdmax_subs_in_old_optimizedsized groupsAutomatic Optimization: When unoptimized old streams exceed
max_old_unoptimizedlimit, all account subscriptions are rebuilt into optimized chunks.Update Subscription Retry Logic: Implements linear backoff (up to 5 retries) when writing subscription updates to stream handles fails.
Unified Stream Polling: All streams (account and program) are maintained in a
StreamMapand polled together vianext_update()which returns updates tagged with their source type.Program Subscriptions: Independent program subscription management that can coexist with account subscriptions.
Testing
Comprehensive test suite covering:
Error Handling
Added
GrpcSubscriptionUpdateFailederror variant to handle gRPC subscription update failures with retry context.Summary by CodeRabbit
New Features
Bug Fixes
Tests
Metrics
Architecture Improvements